package com.ricky.flow;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.Flow.*;
import java.util.function.Consumer;

/**
 java9中新增的生产者消费者

 以下是官方示例(源码中的用法)
 *
 */
public class FlowDemo {
    public static void main(String[] args) {
        OneShotPublisher publisher=new OneShotPublisher();
        SampleSubscriber subscriber=new SampleSubscriber(10, new Consumer() {
            @Override
            public void accept(Object o) {
                if (o instanceof Boolean) {
                    boolean b= (boolean) o;
                    if (b) {
                        System.out.println("开启任务执行");
                    }
                }else {
                    System.out.println("其他非boolean情况");
                }
            }
        });

        publisher.subscribe(subscriber);
        publisher.subscribe(subscriber);//验证是否可以多次消费
        try {
            Thread.currentThread().join(10);//避免执行失败
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

/**
 * 一次性生产者
 *
 * 消息控件-->生产者(发布者)
 */
class OneShotPublisher implements Publisher<Boolean> {
   private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
   private boolean subscribed; // 首次订阅后变为true  默认false   只能消费一次

    /**
     * 推送消费者
     * @param subscriber
     */
   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
     if (subscribed)
       subscriber.onError(new IllegalStateException()); // 只能消费一次
     else {
       subscribed = true;//这里标示已经消费过了
       subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
     }
   }

    /***
     * 消息控件-->链接者
     */
   static class OneShotSubscription implements Subscription {
     private final Subscriber<? super Boolean> subscriber;
     private final ExecutorService executor;
     private Future<?> future; // to allow cancellation
     private boolean completed;
     OneShotSubscription(Subscriber<? super Boolean> subscriber,
                         ExecutorService executor) {
       this.subscriber = subscriber;
       this.executor = executor;
     }

       /**
        * 请求执行
        * @param n
        */
     public synchronized void request(long n) {
         System.out.println("请求："+completed);
       if (n != 0 && !completed) {
         completed = true;
         if (n < 0) {
           IllegalArgumentException ex = new IllegalArgumentException();
           executor.execute(() -> subscriber.onError(ex));
         } else {
           future = executor.submit(() -> {
             subscriber.onNext(Boolean.TRUE);
             subscriber.onComplete();
           });
         }
       }
     }

       /**
        * 拒绝
        */
     public synchronized void cancel() {
       completed = true;
       if (future != null) future.cancel(false);
     }
   }
 }

/**
 * 简单消费者
 *
 * 消息控件-->消费者(接收者)
 * @param <T>
 */
class SampleSubscriber<T> implements Subscriber<T> {
   final Consumer<? super T> consumer;
   Subscription subscription;
   final long bufferSize;
   long count;
   SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
     this.bufferSize = bufferSize;
     this.consumer = consumer;
   }
   public void onSubscribe(Subscription subscription) {
     long initialRequestSize = bufferSize;
     count = bufferSize - bufferSize / 2; // re-request when half consumed
     (this.subscription = subscription).request(initialRequestSize);
   }
   public void onNext(T item) {
     if (--count <= 0)
     subscription.request(count = bufferSize - bufferSize / 2);
     consumer.accept(item);
   }
   public void onError(Throwable ex) { ex.printStackTrace(); }
   public void onComplete() {
       System.out.println("hi,ricky bai,your task complete");
   }
 }


/**
 * 自由消费者
 * @param <T>
 */
class UnboundedSubscriber<T> implements Subscriber<T> {
   public void onSubscribe(Subscription subscription) {
     subscription.request(Long.MAX_VALUE); // effectively unbounded
   }
   public void onNext(T item) { use(item); }
   public void onError(Throwable ex) { ex.printStackTrace(); }
   public void onComplete() {}
   void use(T item) {}
 }

